// This file is made available under Elastic License 2.0.
// This file is based on code available under the Apache license here:
//   https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package com.starrocks.analysis;

import com.google.common.base.Preconditions;
import com.starrocks.catalog.AggregateType;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.PrimitiveType;
import com.starrocks.catalog.ScalarType;
import com.starrocks.catalog.Type;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.FeNameFormat;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

// Column definition which is generated by SQL syntax parser
// Syntax:
//      name type [key] [agg_type] [NULL | NOT NULL] [DEFAULT default_value] [comment]
// Example:
//      id bigint key NOT NULL DEFAULT "-1" "user id"
//      pv bigint sum NULL DEFAULT "-1" "page visit"
public class ColumnDef {
    private static final Logger LOG = LogManager.getLogger(ColumnDef.class);

    /*
     * User can set default value for a column
     * eg:
     *     k1 INT NOT NULL DEFAULT "10"
     *     k1 INT NULL
     *     k1 INT NULL DEFAULT NULL
     *
     * ColumnnDef will be transformed to Column in Analysis phase, and in Column, default value is a String.
     * No matter does the user set the default value as NULL explicitly, or not set default value,
     * the default value in Column will be "null", so that StarRocks can not distinguish between "not set" and "set as null".
     *
     * But this is OK because Column has another attribute "isAllowNull".
     * If the column is not allowed to be null, and user does not set the default value,
     * even if default value saved in Column is null, the "null" value can not be loaded into this column,
     * so data correctness can be guaranteed.
     */
    public static class DefaultValue {
        public boolean isSet;
        public String value;

        public DefaultValue(boolean isSet, String value) {
            this.isSet = isSet;
            this.value = value;
        }

        // no default value
        public static DefaultValue NOT_SET = new DefaultValue(false, null);
        // default null
        public static DefaultValue NULL_DEFAULT_VALUE = new DefaultValue(true, null);
        public static String ZERO = new String(new byte[] {0});
        // default "value", "0" means empty hll
        public static DefaultValue HLL_EMPTY_DEFAULT_VALUE = new DefaultValue(true, ZERO);
        // default "value", "0" means empty bitmap
        public static DefaultValue BITMAP_EMPTY_DEFAULT_VALUE = new DefaultValue(true, ZERO);
    }

    // parameter initialized in constructor
    private final String name;
    private final TypeDef typeDef;
    private AggregateType aggregateType;
    private boolean isKey;
    private boolean isAllowNull;
    private DefaultValue defaultValue;
    private final String comment;

    public ColumnDef(String name, TypeDef typeDef) {
        this(name, typeDef, false, null, false, DefaultValue.NOT_SET, "");
    }

    public ColumnDef(String name, TypeDef typeDef, boolean isKey, AggregateType aggregateType,
                     boolean isAllowNull, DefaultValue defaultValue, String comment) {
        this.name = name;
        this.typeDef = typeDef;
        this.isKey = isKey;
        this.aggregateType = aggregateType;
        this.isAllowNull = isAllowNull;
        this.defaultValue = defaultValue;
        this.comment = comment;
    }

    public boolean isAllowNull() {
        return isAllowNull;
    }

    public String getDefaultValue() {
        return defaultValue.value;
    }

    public String getName() {
        return name;
    }

    public AggregateType getAggregateType() {
        return aggregateType;
    }

    public void setAggregateType(AggregateType aggregateType) {
        this.aggregateType = aggregateType;
    }

    public boolean isKey() {
        return isKey;
    }

    public void setIsKey(boolean isKey) {
        this.isKey = isKey;
    }

    public TypeDef getTypeDef() {
        return typeDef;
    }

    public Type getType() {
        return typeDef.getType();
    }

    public void analyze(boolean isOlap) throws AnalysisException {
        if (name == null || typeDef == null) {
            throw new AnalysisException("No column name or column type in column definition.");
        }
        FeNameFormat.checkColumnName(name);

        // When string type length is not assigned, it need to be assigned to 1.
        if (typeDef.getType().isScalarType()) {
            final ScalarType targetType = (ScalarType) typeDef.getType();
            if (targetType.getPrimitiveType().isStringType()
                    && !targetType.isAssignedStrLenInColDefinition()) {
                targetType.setLength(1);
            }
        }

        typeDef.analyze(null);

        Type type = typeDef.getType();

        if (isKey && isOlap && !type.isKeyType()) {
            if (type.isFloatingPointType()) {
                throw new AnalysisException(
                        String.format("Invalid data type of key column '%s': '%s', use decimal instead", name, type));
            } else {
                throw new AnalysisException(String.format("Invalid data type of key column '%s': '%s'", name, type));
            }
        }

        // A column is a key column if and only if isKey is true.
        // aggregateType == null does not mean that this is a key column,
        // because when creating a UNIQUE KEY table, aggregateType is implicit.
        if (aggregateType != null && aggregateType != AggregateType.NONE) {
            if (isKey) {
                throw new AnalysisException(
                        String.format("Cannot specify aggregate function '%s' for key column '%s'", aggregateType,
                                name));
            }
            if (!aggregateType.checkCompatibility(type.getPrimitiveType())) {
                throw new AnalysisException(
                        String.format("Invalid aggregate function '%s' for '%s'", aggregateType, name));
            }
        } else if (type.isBitmapType() || type.isHllType() || type.isPercentile()) {
            throw new AnalysisException(String.format("No aggregate function specified for '%s'", name));
        }

        if (type.isHllType()) {
            if (defaultValue.isSet) {
                throw new AnalysisException(String.format("Invalid default value for '%s'", name));
            }
            defaultValue = DefaultValue.HLL_EMPTY_DEFAULT_VALUE;
        }

        if (type.isBitmapType()) {
            if (defaultValue.isSet) {
                throw new AnalysisException(String.format("Invalid default value for '%s'", name));
            }
            defaultValue = DefaultValue.BITMAP_EMPTY_DEFAULT_VALUE;
        }

        // If aggregate type is REPLACE_IF_NOT_NULL, we set it nullable.
        // If defalut value is not set, we set it NULL
        if (aggregateType == AggregateType.REPLACE_IF_NOT_NULL) {
            isAllowNull = true;
            if (!defaultValue.isSet) {
                defaultValue = DefaultValue.NULL_DEFAULT_VALUE;
            }
        }

        if (!isAllowNull && defaultValue == DefaultValue.NULL_DEFAULT_VALUE) {
            throw new AnalysisException(String.format("Invalid default value for '%s'", name));
        }

        if (defaultValue.isSet && defaultValue.value != null) {
            try {
                validateDefaultValue(type, defaultValue.value);
            } catch (AnalysisException e) {
                throw new AnalysisException(String.format("Invalid default value for '%s': %s", name, e.getMessage()));
            }
        }
    }

    public static void validateDefaultValue(Type type, String defaultValue) throws AnalysisException {
        Preconditions.checkNotNull(defaultValue);
        if (type.isComplexType()) {
            throw new AnalysisException(String.format("Default value for complex type '%s' not supported", type));
        }
        ScalarType scalarType = (ScalarType) type;
        // check if default value is valid. if not, some literal constructor will throw AnalysisException
        PrimitiveType primitiveType = scalarType.getPrimitiveType();
        switch (primitiveType) {
            case TINYINT:
            case SMALLINT:
            case INT:
            case BIGINT:
                IntLiteral intLiteral = new IntLiteral(defaultValue, type);
                break;
            case LARGEINT:
                LargeIntLiteral largeIntLiteral = new LargeIntLiteral(defaultValue);
                break;
            case FLOAT:
                FloatLiteral floatLiteral = new FloatLiteral(defaultValue);
                if (floatLiteral.getType().isDouble()) {
                    throw new AnalysisException("Default value will loose precision: " + defaultValue);
                }
            case DOUBLE:
                FloatLiteral doubleLiteral = new FloatLiteral(defaultValue);
                break;
            case DECIMALV2:
            case DECIMAL32:
            case DECIMAL64:
            case DECIMAL128:
                DecimalLiteral decimalLiteral = new DecimalLiteral(defaultValue);
                decimalLiteral.checkPrecisionAndScale(scalarType.getScalarPrecision(), scalarType.getScalarScale());
                break;
            case DATE:
            case DATETIME:
                DateLiteral dateLiteral = new DateLiteral(defaultValue, type);
                break;
            case CHAR:
            case VARCHAR:
            case HLL:
                if (defaultValue.length() > scalarType.getLength()) {
                    throw new AnalysisException("Default value is too long: " + defaultValue);
                }
                break;
            case BITMAP:
                break;
            case BOOLEAN:
                BoolLiteral boolLiteral = new BoolLiteral(defaultValue);
                break;
            default:
                throw new AnalysisException(String.format("Cannot add default value for type '%s'", type));
        }
    }

    public String toSql() {
        StringBuilder sb = new StringBuilder();
        sb.append("`").append(name).append("` ");
        sb.append(typeDef.toSql()).append(" ");

        if (aggregateType != null) {
            sb.append(aggregateType.name()).append(" ");
        }

        if (!isAllowNull) {
            sb.append("NOT NULL ");
        } else {
            // should append NULL to make result can be executed right.
            sb.append("NULL ");
        }

        if (defaultValue.isSet) {
            sb.append("DEFAULT \"").append(defaultValue.value).append("\" ");
        }
        sb.append("COMMENT \"").append(comment).append("\"");

        return sb.toString();
    }

    public Column toColumn() {
        return new Column(name, typeDef.getType(), isKey, aggregateType, isAllowNull, defaultValue.value, comment);
    }

    @Override
    public String toString() {
        return toSql();
    }
}
